package com.m.man.queue.kafka;

import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;

//@Component
@Service
public class KafkaProducer implements KafkaProducerInterface {

    private static Logger logger = LoggerFactory.getLogger(KafkaProducer.class);

    private Gson gson = new GsonBuilder().create();

    final KafkaTemplate<String, String> kafkaTemplate;

    @Autowired
    KafkaSendResultHandler producerListener;

    public KafkaProducer(KafkaTemplate<String, String> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    //为kafka添加事务
    @Transactional
    //发送消息方法
    public void send(String topic, String msg) throws ExecutionException, InterruptedException, TimeoutException {
        System.out.println("生产者发送:" + topic + "+++++++++++++++++++++++++++>>>>" + msg);

        //消息结果回调配置
        kafkaTemplate.setProducerListener(producerListener);

        //默认同步发送
        kafkaTemplate.send(topic, msg);


        /*
        //同步发送   当send方法耗时大于get方法所设定的参数时会抛出一个超时异常，
        但需要注意，这里仅抛出异常，消息还是会发送成功的
        kafkaTemplate.send(topic, "test send message timeout").get(1, TimeUnit.MICROSECONDS);
        */

        //异步发送 发送时间较长的时候会导致进程提前关闭导致无法调用回调函数
        Thread.sleep(1000);
    }
}
